Converting Spark RDD to DataFrame can be done by using
**toDF() has another signature which takes arguments for custom column names as shown below.
val rdd = sc.parallelize(Seq
val df=sc.parallelize(1 to 100)
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val rowsRdd: RDD[Row] = sc.parallelize(Seq(
- toDF()
- createDataFrame()
**toDF() has another signature which takes arguments for custom column names as shown below.
val rdd = sc.parallelize(Seq
(("first", Array(2.0, 1.0, 2.1, 5.4)),
("test",
Array(1.5, 0.5, 0.9,3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))))
val rdd1 = rdd.toDF("id","value")
rdd1.show
val rdd1 = rdd.toDF("id","value")
rdd1.show
val df=sc.parallelize(1 to 100)
.map(a=>(s"user$a",a*.123,a))
.toDF("name","score","user_id")
df.show
By default, the datatype of these columns infers to the type of data. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.In this example, the number column is not nullable and the word column is nullable.
val rdd = sc.parallelize(Seq(("first", Array(2.0, 1.0, 2.1, 5.4)),
df.show
By default, the datatype of these columns infers to the type of data. We can change this behavior by supplying schema – where we can specify a column name, data type and nullable for each field/column.In this example, the number column is not nullable and the word column is nullable.
val rdd = sc.parallelize(Seq(("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9,3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))))
val rdd1 = rdd.toDF("id","value")
rdd1.printSchema
createDataFrame():Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.Creates a DataFrame from an RDD containing Rows using the given schema.
val rdd1 = rdd.toDF("id","value")
rdd1.printSchema
createDataFrame():Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.Creates a DataFrame from an RDD containing Rows using the given schema.
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
val rowsRdd: RDD[Row] = sc.parallelize(Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
val df = spark.createDataFrame(rowsRdd, schema)
df.show()
df.printSchema
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
val df = spark.createDataFrame(rowsRdd, schema)
df.show()
df.printSchema
Example
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val appName = "Scala Example - List to Spark Data Frame"
val master = "local"
/*Create Spark session with Hive supported.*/
val spark = SparkSession.builder.appName(appName).master(master).getOrCreate()
/* List */
val data = List(Row("Category A", 100, "This is category A"),
Row("Category B", 120, "This is category B"),
Row("Category C", 150, "This is category C"))
val schema = StructType(List(
StructField("Category", StringType, true),
StructField("Count", IntegerType, true),
StructField("Description", StringType, true)))
/* Convert list to RDD */
val rdd = spark.sparkContext.parallelize(data)
/* Create data frame */
val df = spark.createDataFrame(rdd, schema)
print(df.schema)
df.show()
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val appName = "Scala Example - List to Spark Data Frame"
val master = "local"
/*Create Spark session with Hive supported.*/
val spark = SparkSession.builder.appName(appName).master(master).getOrCreate()
/* List */
val data = List(Row("Category A", 100, "This is category A"),
Row("Category B", 120, "This is category B"),
Row("Category C", 150, "This is category C"))
val schema = StructType(List(
StructField("Category", StringType, true),
StructField("Count", IntegerType, true),
StructField("Description", StringType, true)))
/* Convert list to RDD */
val rdd = spark.sparkContext.parallelize(data)
/* Create data frame */
val df = spark.createDataFrame(rdd, schema)
print(df.schema)
df.show()
No comments:
Post a Comment